%matplotlib inline
%load_ext autoreload
%autoreload 2
import pandas as pd
import fastai
from fastai.vision import *
from fastai.callbacks import *
from fastai.utils.mem import *
import tensorflow as tf
from torchvision.models import vgg16_bn
import torchvision.transforms as transforms
from skimage import measure
from modified_model_fx import *
import warnings
warnings.filterwarnings('ignore')
trans1 = transforms.ToTensor()
trans = transforms.ToPILImage()
Set CUDA:
torch.cuda.set_device(3)
Input images:
Paths to data:
path = Path('../../../../../SCRATCH2/marvande/data/train/HR/')
# path to HR data:
path_hr = path / 'HR_patches_train/tiff_files'
# path where we create the LR data:
path_lr = path / 'small-250/train'
# path to MR data (of same size as HR):
path_mr = path / 'small-502/train'
# path to original LR data:
path_lr_or = path / 'HR_patches_resized/jpg_images'
assert path.exists(), f"need dataset @ {path}"
assert path_hr.exists()
Have a look at what type of data is in those folders. First, we look at our HR images from path_hr.
il = ImageList.from_folder(path_hr)
PIL.Image.open(
'../../../../../SCRATCH2/marvande/data/train/HR/HR_patches_train/tiff_files/0124_[47360,11368]_part_1_1_.tif'
)
From this HR data, we create LR versions in path_lr and path_mr for training. LR images will be of size (3, 250, 334) while MR images of the same size as the HR but of lower quality (MR images are used in the second phase of training), thus (3, 502, 672).
def resize_one(fn, i, path, size):
"""resize_one: resizes images to input size and saves them in path,
quality is lowered as to get LR images.
"""
dest = path / fn.relative_to(path_hr)
dest.parent.mkdir(parents=True, exist_ok=True)
img = PIL.Image.open(fn)
img = img.resize(size, resample=PIL.Image.BICUBIC).convert('RGB')
img.save(dest, quality=60)
# create smaller image sets of lower quality the first time this nb is run:
sets = [(path_lr, (334, 250)), (path_mr, (672, 502))]
for p, size in sets:
if not p.exists():
print(f"resizing to {size} into {p}")
parallel(partial(resize_one, path=p, size=size), il.items)
Have a look at the new LR and MR images and their sizes.
print('LR:')
print(ImageList.from_folder(path_lr))
print('MR:')
print(ImageList.from_folder(path_mr))
As the model's first phase is on images of the LR size, we create the first batch of training data of size (3, 250, 334). So HR images are downsized to (3, 250, 334) and both HR and LR are transformed using several transformations (see modified_model_fx.py). Those data augmentation techniques help avoid overfitting during training.
# set image size and batch size to which data is transformed:
bs, size = 15, (250, 334)
arch = models.resnet34
src = ImageImageList.from_folder(path_lr).split_by_rand_pct(0.1, seed=42)
Change statistics.
def get_data(bs, size):
"""
get_data: creates training and validation data from LR and HR.
downsizes HR to LR size and applies transformations to both.
"""
#label_from_func: apply func to every input to get its label.
# defining a custom function to extract the labels
data = src.label_from_func(lambda x: path_hr / x.relative_to(path_lr))
#apply data transformations,
#data = data.transform(tfms, size=size, tfm_y=True).databunch(bs=bs).normalize(imagenet_stats,do_y=True)
data = data.transform(
tfms, size=size, tfm_y=True).databunch(bs=bs).normalize(do_y=True)
data.c = 3
return data
data = get_data(bs, size)
data
Have a look at a few data from the validation data. Left are the LR while right the corresponding Hr images.
# lr
x = data.one_batch(ds_type=DatasetType.Valid)[0][4]
# hr
y = data.one_batch(ds_type=DatasetType.Valid)[1][4]
print('LR data shape {} and HR shape {}'.format(list(x.shape), list(y.shape)))
MSE = mse_mult_chann(x.numpy(), y.numpy())
NMSE = measure.compare_nrmse(x.numpy(), y.numpy())
SSIM = ssim_mult_chann(x.numpy(), y.numpy())
print('MSE: {}, NMSE: {}, SSIM: {}'.format(MSE, NMSE, SSIM))
plot_single_image(x, '', (10,10))
plot_single_image(y, '', (10,10))
data.show_batch(ds_type=DatasetType.Valid, rows=2, figsize=(20, 20))
Create loss metrics used in the model.
def gram_matrix(x):
"""
Gram matrix of a set of vectors in an inner
product space is the Hermitian matrix of inner products
"""
n,c,h,w = x.size()
x = x.view(n, c, -1)
return (x @ x.transpose(1,2))/(c*h*w)
Select the data of an image and use it to compute its Gram matrix.
im = data.valid_ds[0][1]
t = im.data
t = torch.stack([t,t])
gram_matrix(t)
We define a base loss as the L1 loss (F.l1_loss).
base_loss = F.l1_loss
Construct a pre-trained vgg16 model (Very Deep Convolutional Networks for Large-Scale Image Recognition). VGG is another Convolutional Neural Network (CNN) architecture devised in 2014, the 16 layer version is utilised in the loss function for training this model. VGG model. a network pretrained on ImageNet, is used to evaluate the generator model’s loss.
Further, we set requires_grad to False as this is useful when you want to freeze part of your model, or you know in advance that you are not going to use gradients w.r.t. some parameters.

The head of the VGG model is the final layers shown as fully connected and softmax in the above diagram. This head is ignored and the loss function uses the intermediate activations in the backbone of the network, which represent the feature detections.

Those activations can be found by looking through the VGG model to find all the max pooling layers. These are where the grid size changes and features are detected. So we need to select those layers.
vgg_m = vgg16_bn(True).features.cuda().eval()
requires_grad(vgg_m, False)
Select the layer IDs of MaxPool2d blocks:
blocks = [i-1 for i,o in enumerate(children(vgg_m)) if isinstance(o,nn.MaxPool2d)]
blocks, [vgg_m[i] for i in blocks]
Create the feature loss from the model and layer ids selected above.
Source:
Main points:
Predictions from models trained with this loss function: The generated predictions from trained models using loss functions based on these techniques have both convincing fine detail and style. That style and fine detail may be different aspects of image quality be predicting fine pixel detail or the predicting correct colours.
class FeatureLoss(nn.Module):
def __init__(self, m_feat, layer_ids, layer_wgts):
super().__init__()
self.m_feat = m_feat
self.loss_features = [self.m_feat[i] for i in layer_ids]
self.hooks = hook_outputs(self.loss_features, detach=False)
self.wgts = layer_wgts
self.metric_names = ['pixel',] + [f'feat_{i}' for i in range(len(layer_ids))
] + [f'gram_{i}' for i in range(len(layer_ids))]
def make_features(self, x, clone=False):
self.m_feat(x)
return [(o.clone() if clone else o) for o in self.hooks.stored]
def forward(self, input, target):
out_feat = self.make_features(target, clone=True)
in_feat = self.make_features(input)
self.feat_losses = [base_loss(input,target)]
#feat losses
self.feat_losses += [base_loss(f_in, f_out)*w
for f_in, f_out, w in zip(in_feat, out_feat, self.wgts)]
#gram:
self.feat_losses += [base_loss(gram_matrix(f_in), gram_matrix(f_out))*w**2 * 5e3
for f_in, f_out, w in zip(in_feat, out_feat, self.wgts)]
self.metrics = dict(zip(self.metric_names, self.feat_losses))
return sum(self.feat_losses)
def __del__(self): self.hooks.remove()
feat_loss = FeatureLoss(vgg_m, blocks[2:5], [5,15,2])
lr = 1e-3
def do_fit(save_name, lrs=slice(lr), pct_start=0.9):
learn.fit_one_cycle(10, lrs, pct_start=pct_start)
learn.save(save_name, return_path=True)
learn.show_results(rows=1, imgsize=10)
# delete all tensors and free cache:
for obj in gc.get_objects():
if torch.is_tensor(obj):
del obj
torch.cuda.empty_cache()
gc.collect()
#learn.destroy()
#get free memory (in MBs) for the currently selected gpu id, after emptying the cache
print(
'free memory (in MBs) for the currently selected gpu id, after emptying the cache: ',
gpu_mem_get_free_no_cache())
print(
'used memory (in MBs) for the currently selected gpu id, after emptying the cache:',
gpu_mem_get_used_no_cache())
gpu_mem_get_all()
wd = 1e-3
learn = unet_learner(data,
arch,
wd=wd,
loss_func=feat_loss,
callback_fns=LossMetrics,
blur=True,
norm_type=NormType.Weight)
# garbage collection:
gc.collect()
learn.lr_find()
learn.recorder.plot()
lr = 1e-3
do_fit('1a_mod_norm', slice(lr * 10))
! cp ../../../../../SCRATCH2/marvande/data/train/HR/small-250/train/models/1a_mod_norm.pth ../../../../../SCRATCH2/marvande/data/train/HR/models/
learn.unfreeze()
learn.load('1a_mod_norm');
do_fit('1b_mod_norm', slice(1e-5,lr))
! cp ../../../../../SCRATCH2/marvande/data/train/HR/small-250/train/models/1b_mod_norm.pth ../../../../../SCRATCH2/marvande/data/train/HR/models/
torch.cuda.empty_cache()
gc.collect()
#learn.destroy()
#get free memory (in MBs) for the currently selected gpu id, after emptying the cache
print(
'free memory (in MBs) for the currently selected gpu id, after emptying the cache: ',
gpu_mem_get_free_no_cache())
print(
'used memory (in MBs) for the currently selected gpu id, after emptying the cache:',
gpu_mem_get_used_no_cache())
gpu_mem_get_all()
new_size = (502, 672)
bs = 4
data = get_data(bs, new_size)
wd = 1e-3
learn = unet_learner(
data,
arch,
wd=wd,
loss_func=feat_loss,
callback_fns=LossMetrics,
blur=True,
norm_type=NormType.Weight,)
lr = 1e-3
# garbage collection:
gc.collect()
data
new_size = (502, 672)
bs = 4
data = get_data(bs, new_size)
# lr
x = data.one_batch(ds_type=DatasetType.Valid)[0][1]
# hr
y = data.one_batch(ds_type=DatasetType.Valid)[1][1]
print('LR data shape {} and HR shape {}'.format(list(x.shape), list(y.shape)))
MSE = mse_mult_chann(x.numpy(), y.numpy())
NMSE = measure.compare_nrmse(x.numpy(), y.numpy())
SSIM = ssim_mult_chann(x.numpy(), y.numpy())
print('MSE: {}, NMSE: {}, SSIM: {}'.format(MSE, NMSE, SSIM))
plot_single_image(x, '', (10,10))
plot_single_image(y, '', (10,10))
learn.data = data
learn.freeze()
gc.collect()
learn.load('1b_mod_norm');
do_fit('2a_mod_norm')
! cp ../../../../../SCRATCH2/marvande/data/train/HR/small-250/train/models/2a_mod_norm.pth ../../../../../SCRATCH2/marvande/data/train/HR/models/
learn.load('2a_mod_norm');
do_fit('2b_mod_norm', slice(1e-6,1e-4), pct_start=0.3)
! cp ../../../../../SCRATCH2/marvande/data/train/HR/small-250/train/models/2b_mod_norm.pth ../../../../../SCRATCH2/marvande/data/train/HR/models/
Reconstruct the learn object, this time with l1_loss and not with feature_loss, why?
PSNR ? frechet inception distance FID score ?
learn = None
gc.collect()
learn = unet_learner(data,
arch,
loss_func=F.l1_loss,
blur=True,
norm_type=NormType.Weight)
Set the sizes of the LR input images (size_lr) for testing and the HR images (size_mr).
# path to test images:
path_test = path/'HR_patches_test/cut_images/jpeg_images/'
# path to save LR and MR test images:
path_lr_test = path / 'small-250/test'
path_mr_test = path / 'small-502/test'
#Check free GPU RAM:
free = gpu_mem_get_free_no_cache()
print(f"using size={size}, have {free} MB of GPU RAM free")
! rm -r '../../../../../SCRATCH2/marvande/data/train/HR/small-250/test'
! rm -r '../../../../../SCRATCH2/marvande/data/train/HR/small-502/test'
def resize_test(fn, i, path, size):
"""resize_one: resizes images to input size and saves them in path,
quality is lowered as to get LR images.
"""
dest = path / fn.relative_to(path_test)
dest.parent.mkdir(parents=True, exist_ok=True)
img = PIL.Image.open(fn)
img = img.resize(size, resample=PIL.Image.BICUBIC).convert('RGB')
img.save(dest, quality=60)
il = ImageList.from_folder(path_test)
# create smaller image sets the first time this nb is run:
sets = [(path_lr_test, (334, 250)), (path_mr_test, (672, 502))]
for p, size in sets:
if not p.exists():
print(f"resizing to {size} into {p}")
parallel(partial(resize_test, path=p, size=size), il.items)
Create testing data, data_mr of size HR and data_lr of size LR:
size_mr = (3, 502, 672)
size_lr = (3, 250, 334)
data_mr = (ImageImageList.from_folder(path_mr_test).split_by_rand_pct(0.1, seed=42)
.label_from_func(lambda x: path_test/x.name)
.transform(tfms, size=size_mr, tfm_y=True)
.databunch(bs=1).normalize(do_y=True))
data_mr.c = 3
data_lr = (ImageImageList.from_folder(path_lr_test).split_by_rand_pct(0.1, seed=42)
.label_from_func(lambda x: path_test/x.name)
.transform(tfms, size=size_lr, tfm_y=True)
.databunch(bs=1).normalize(do_y=True))
data_lr.c = 3
data_hr = (ImageImageList.from_folder(path_test).split_by_rand_pct(0.1, seed=42)
.label_from_func(lambda x: path_test/x.name)
.transform(tfms, size=size_mr, tfm_y=True)
.databunch(bs=1).normalize(do_y=True))
data_hr.c = 3
data_lr, data_mr, data_hr
Load the learn object from the last phase (2b_mod) and set it's data to data_mr (why?).
learn.load('2b_mod_norm')
# Here have to put mr if we want output the same size as HR:
learn.data = data_mr
Select the images we are going to use for testing from data_mr and data_lr:
# Ground truth HR image:
mr = data_mr.valid_ds.x.items[1]
im_mr = open_image(mr)
hr = data_mr.valid_ds.y.items[1]
#hr = data_hr.valid_ds.x.items[1]
im_HR_gt = open_image(hr)
# LR version of the same image:
lr = data_lr.valid_ds.x.items[1]
im_lr = open_image(lr)
# Check it's the same image:
mr, lr, hr
To be able to apply the model and predict the HR image from the LR, we need to resample the LR image to be of the same size as the HR, as the model inputs and outputs images of the same size.
# resample to same size as HR:
# for pytorch, have to add a first new dimension to indicate bs = 1
# now lr_data of shape [1, 3, 250, 334]
im_lr = im_lr.data.unsqueeze(0)
lr_resized = torch.nn.functional.interpolate(im_lr, size_mr[1:],
mode='bicubic')
# remove the previous added dimension
lr_resized = lr_resized.squeeze()
input_image = lr_resized
# plot resized LR:
plot_single_image(lr_resized,
'Resized LR from size {} to size {}'\
.format(list(im_lr.squeeze().shape),
list(lr_resized.shape)), (10,10))
Now that we have resized the LR to the same size as the ground truth HR, we can feed it to the model and predict a new HR image.
#print('HR ground thruth shape: {}'.format(list(im_HR_gt.shape)))
# Prediction of model:
p, img_pred, b = learn.predict(Image(input_image))
print('Predicted HR shape: {}'.format(list(p.shape)))
# Assert reconstructed HR has same shape as ground truth HR:
assert(list(p.shape) == list(im_HR_gt.shape))
Compare this predicted image to the ground truth HR and LR:
plot_3_images(
trans1(trans(im_mr.data)).numpy(),
trans1(trans(im_HR_gt.data)).numpy(), img_pred, (20, 20))
print('PRed')
print(np.max(img_pred.numpy()))
print(np.min(img_pred.numpy()))
print('Ground truth HR')
print(np.max(trans1(trans(im_HR_gt.data)).numpy()))
print(np.min(trans1(trans(im_HR_gt.data)).numpy()))
print('LR bic')
print(np.max(input_image.numpy()))
print(np.min(input_image.numpy()))
min(max_HR, trans1(trans(im_HR_gt.data)).numpy())
max(min_HR, trans1(trans(im_HR_gt.data)).numpy())
max_HR = np.max(trans1(trans(im_HR_gt.data)).numpy())
min_HR = np.min(trans1(trans(im_HR_gt.data)).numpy())
test_im = np.clip(a_max = max_HR,a_min = min_HR, a = img_pred.numpy())
print(mse_mult_chann(img_pred.numpy(), trans1(trans(im_HR_gt.data)).numpy()))
mse_mult_chann(test_im, trans1(trans(im_HR_gt.data)).numpy())
compare_images_metrics(img_pred.numpy(),
trans1(trans(im_HR_gt.data)).numpy(),
lr_resized.numpy(), '')
## Plot for several:
LR_list = ImageList.from_folder(path_lr_test).items
HR_list = ImageList.from_folder(path_test).items
phenotypes = {'0': 'CD4', '1': 'CK','2': 'DAPI', '3': 'CD3', '4': 'FoxP3', '5': 'CD8'}
for i in range(len(LR_list[0:5])):
lr_data = open_image(LR_list[i])
# get file string
pattern = "test\/(.*?)\.jpg"
substring = re.search(pattern, str(LR_list[i])).group(1)
file_name = substring+'.jpg'
# get patient number:
pattern = "test\/(.*?)_"
patient = re.search(pattern, str(LR_list[i])).group(1)
# get phenotype number:
pattern2 = "_\-(.*?)\.jpg"
phenotype_numb = re.search(pattern2,file_name).group(1)
phenotype = phenotypes[phenotype_numb]
# get location number:
pattern2 = "\[(.*?)\]"
location = re.search(pattern2,file_name).group(1)
# resample to same size as HR:
# for pytorch, have to add a first new dimension to indicate bs = 1
# now lr_data of shape [1, 3, 250, 334]
lr_data = lr_data.data.unsqueeze(0)
lr_resized = torch.nn.functional.interpolate(lr_data, size_mr[1:],
mode='bicubic')
# remove the previous added dimension
lr_resized = lr_resized.squeeze()
# corresponding ground truth HR:
im_HR_gt = open_image(HR_list[i])
#print('HR ground thruth shape: {}'.format(list(im_HR_gt.shape)))
# Prediction of model:
p, img_pred, b = learn.predict(Image(lr_resized))
#print('Reconstructed HR shape: {}'.format(list(p.shape)))
# Assert reconstructed HR has same shape as ground truth HR:
assert(list(p.shape) == list(im_HR_gt.shape))
gt_HR_nd = trans1(trans(im_HR_gt.data)).numpy()
pred_HR_nd = img_pred.numpy()
compare_images_metrics(pred_HR_nd,gt_HR_nd, lr_resized.numpy(),'phenotype: {}, patient: {}, location: [{}]'.format(phenotype, patient, location))
# Create test table with results:
def testing_images(path_lr, path_hr, show_im = False):
phenotypes = {'0': 'CD4', '1': 'CK','2': 'DAPI', '3': 'CD3', '4': 'FoxP3', '5': 'CD8'}
MSE,NMSE, SSIM = [], [],[]
LR_list = ImageList.from_folder(path_lr).items
HR_list = ImageList.from_folder(path_hr).items
file_names, locations, patients, phenotypes_list = [], [], [], []
LR_MSE, LR_NMSE, LR_SSIM= [], [], []
for i in range(len(LR_list[0:20])):
lr_data = open_image(LR_list[i])
pattern = "test\/(.*?)\.jpg"
substring = re.search(pattern, str(LR_list[i])).group(1)
file_names.append(substring+'.jpg')
file_name = substring+'.jpg'
# get patient number:
pattern = "test\/(.*?)_"
patients.append(re.search(pattern, str(LR_list[i])).group(1))
# get phenotype number:
pattern2 = "_\-(.*?)\.jpg"
phenotype_numb = re.search(pattern2,file_name).group(1)
phenotypes_list.append(phenotypes[phenotype_numb])
# get location number:
pattern3 = "\[(.*?)\]"
locations.append('[{}]'.format(re.search(pattern3,file_name).group(1)) )
# resample to same size as HR:
# for pytorch, have to add a first new dimension to indicate bs = 1
# now lr_data of shape [1, 3, 250, 334]
lr_data = lr_data.data.unsqueeze(0)
lr_resized = torch.nn.functional.interpolate(lr_data, size_mr[1:],
mode='bicubic')
# remove the previous added dimension
lr_resized = lr_resized.squeeze()
# corresponding ground truth HR:
im_HR_gt = open_image(HR_list[i])
#print('HR ground thruth shape: {}'.format(list(im_HR_gt.shape)))
# Prediction of model:
p, img_pred, b = learn.predict(Image(lr_resized))
#print('Reconstructed HR shape: {}'.format(list(p.shape)))
# Assert reconstructed HR has same shape as ground truth HR:
assert(list(p.shape) == list(im_HR_gt.shape))
gt_HR_nd = trans1(trans(im_HR_gt.data)).numpy()
pred_HR_nd = img_pred.numpy()
MSE.append(mse_mult_chann(gt_HR_nd, pred_HR_nd))
NMSE.append(measure.compare_nrmse(gt_HR_nd, pred_HR_nd))
SSIM.append(ssim_mult_chann(gt_HR_nd, pred_HR_nd))
LR_MSE.append(mse_mult_chann(lr_resized.numpy(), pred_HR_nd))
LR_NMSE.append(measure.compare_nrmse(lr_resized.numpy(), pred_HR_nd))
LR_SSIM.append(ssim_mult_chann(lr_resized.numpy(), pred_HR_nd))
return pd.DataFrame(data = {'file':file_names,'patient':patients,
'location':locations,
'phenotype':phenotypes_list,
'MSE':MSE,
'NMSE':NMSE, 'SSIM':SSIM,
'LR_MSE': LR_MSE,
'LR_NMSE':LR_NMSE,
'LR_SSIM':LR_SSIM})
df = testing_images(path_lr_test, path_test)
df.to_csv('data/metrics_sim6.csv')
df
av_metrics = pd.DataFrame(index = ['Average and median metrics'],
data={
'avg MSE': np.mean(df['MSE']),
'med MSE': np.median(df['MSE']),
'avg NMSE': np.mean(df['NMSE']),
'med NMSE': np.median(df['NMSE']),
'avg SSIM': np.mean(df['SSIM']),
'med SSIM': np.median(df['SSIM']),
'avg LR_MSE': np.mean(df['LR_MSE']),
'med LR_MSE': np.median(df['LR_MSE']),
'avg LR_NMSE': np.mean(df['LR_NMSE']),
'med LR_NMSE': np.median(df['LR_NMSE']),
'avg LR_SSIM': np.mean(df['LR_SSIM']),
'med LR_SSIM': np.median(df['LR_SSIM'])
})
av_metrics.transpose()
av_metrics.to_csv('data/av_metrics_sim6.csv')
# report average and median MSE, NMSE, SSIM:
fig = plt.figure(figsize = (15, 6))
ax = fig.add_subplot(1, 3, 1)
ax.plot(df['MSE'], '-x', label='MSE prediction vs GT')
ax.plot(df['LR_MSE'],'-x', label = 'MSE interpolated HR vs GT')
ax.axhline(np.median(df['LR_MSE']),color = 'r')
ax.axhline(np.median(df['MSE']),color = 'r')
plt.xlabel('files')
plt.ylabel('MSE')
ax.legend()
ax = fig.add_subplot(1, 3, 2)
ax.plot(df['NMSE'], '-x', label='NMSE prediction vs GT')
ax.plot(df['LR_NMSE'],'-x', label = 'NMSE interpolated HR vs GT')
ax.axhline(np.median(df['LR_NMSE']),color = 'r')
ax.axhline(np.median(df['NMSE']),color = 'r')
plt.xlabel('files')
plt.ylabel('NMSE')
ax.legend()
ax = fig.add_subplot(1, 3, 3)
ax.plot(df['SSIM'], '-x', label='SSIM prediction vs GT')
ax.plot(df['LR_SSIM'],'-x', label = 'SSIM interpolated HR vs GT')
ax.axhline(np.median(df['LR_SSIM']),color = 'r')
ax.axhline(np.median(df['SSIM']),color = 'r')
plt.xlabel('files')
plt.ylabel('SSIM')
ax.legend()
plt.suptitle('Interpolated HR vs GT')
plt.savefig('images/intHRvsGT_sim6.png')